package com.hanxiaozhang.example.listener.mgsconsumer;

import com.hanxiaozhang.constant.RocketConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.util.List;

/**
 * 〈一句话功能简述〉<br>
 * 〈手动确认消息，在并发消费模式下〉
 * <p>
 * todo
 *
 * @author hanxinghua
 * @create 2022/10/5
 * @since 1.0.0
 */
@Slf4j
@Component
public class No10ManualConfirmConsumeOriginalSyntaxListener {


    @PostConstruct
    public void init() {
        DefaultMQPushConsumer consumer = null;

        try {
            // 实例化消费者
            consumer = new DefaultMQPushConsumer(RocketConstant.MANUAL_CONFIRM_CONSUMER_GROUP);
            // 设置NameServer的地址
            consumer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
            // 订阅一个或多个Topic，用Tag来过滤需要消费的消息，这里指定*表示接收所有Tag的消息
            consumer.subscribe(RocketConstant.MANUAL_CONFIRM_TOPIC, "*");
            // 注册回调实现类来处理从broker拉取回来的消息

            // 注册监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                // 消费消息的方法
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    /*
                    -1：表示直接发往死信队列，不经过重试队列，发送的默认死信队列topic名称为%DLQ%+消费者组名
                     0：表示每次按照下面定义的时间依次递增,第一次为1s,第二次为5s...
                       1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                    >0：表示每次重试的时间间隔，由我们用户自定义，1表示重试间隔为1s、2表示5s、3表示10秒、依次递增,重试次数由配置consumer.setMaxReconsumeTimes(10)决定
                        发送的默认重试队列topic名称为%RETRY%+消费者组名。
                     */
                    // 表示重试间隔为1s
                    context.setDelayLevelWhenNextConsume(1);
                    MessageExt msg = msgs.get(0);
                    try {
                        String msgBody = new String(msg.getBody(), "utf-8");
                        log.info("{}收到消息：{}", this.getClass().getSimpleName(), msgBody);
                        if ("{\"name\":\"0\"}".equals(msgBody)) {
                            // 模拟失败
                            int i = 1 / 0;
                            System.out.println(i);
                        }
                    } catch (Exception e) {
                        log.info("当前时间:{},最大重试次数为：{}", LocalDateTime.now(), msgs.get(0).getReconsumeTimes());
                        if (msgs.get(0).getReconsumeTimes() > 3) {
                            // 重试大于3次直接发往死信队列
                            context.setDelayLevelWhenNextConsume(-1);
                        }
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 开启消费者
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        } finally {

        }
    }

}